package com.learning.rocketmq.demo;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

/**
 * 消费者消费消息监听并路由至对应消息处理
 */
@Component
@Slf4j
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {

    @Autowired(required = false)
    private Map<String, MQMsgProcessor> mqMsgProcessorServiceMap;

    /**
     * 默认 msgs 里只有一条消息，可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
     * 不要抛异常，如果没有return CONSUME_SUCCESS ，consumer会重新消费该消息，直到return CONSUME_SUCCESS
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            log.info("接受到的消息为空，不处理，直接返回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        ConsumeConcurrentlyStatus concurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        try {
            //根据Topic分组
            Map<String, List<MessageExt>> topicGroups = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
            for (Entry<String, List<MessageExt>> topicEntry : topicGroups.entrySet()) {
                String topic = topicEntry.getKey();
                //根据tags分组
//                Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().collect(Collectors.groupingBy(MessageExt::getTags));
                Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().filter(item -> StringUtils.isNotBlank(item.getTags())).collect(Collectors.groupingBy(MessageExt::getTags));
                for (Entry<String, List<MessageExt>> tagEntry : tagGroups.entrySet()) {
                    String tag = tagEntry.getKey();
                    //消费某个主题下，tag的消息
                    this.consumeMsgForTag(topic, tag, tagEntry.getValue());
                }
            }
        } catch (Exception e) {
            log.error("处理消息失败", e);
            concurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        // 如果没有return success ，consumer会重新消费该消息，直到return success
        return concurrentlyStatus;
    }

    /**
     * 根据topic 和 tags路由，查找消费消息服务
     *
     * @param topic
     * @param tag
     * @param value
     */
    private void consumeMsgForTag(String topic, String tag, List<MessageExt> value) {
        //根据topic 和  tag查询具体的消费服务
        MQMsgProcessor imqMsgProcessor = selectConsumeService(topic, tag);

        try {
            if (imqMsgProcessor == null) {
                log.error(String.format("根据Topic：%s和Tag:%s 没有找到对应的处理消息的服务", topic, tag));

            } else {
                log.info(String.format("根据Topic：%s和Tag:%s 路由到的服务为:%s，开始调用处理消息", topic, tag, imqMsgProcessor.getClass().getName()));
                //调用该类的方法,处理消息
                MQConsumeResult mqConsumeResult = imqMsgProcessor.handle(topic, tag, value);
                if (mqConsumeResult != null && mqConsumeResult.isSuccess()) {
                    log.info("消息处理成功：" + JSON.toJSONString(mqConsumeResult));
                } else {
                    log.error("消费消息失败");
                }
            }
        } catch (Exception e) {
            log.error("消费消息失败" + e);
        }
    }

    /**
     * 根据topic和tag查询对应的具体的消费服务
     *
     * @param topic
     * @param tag
     * @return
     */
    private MQMsgProcessor selectConsumeService(String topic, String tag) {
        MQMsgProcessor imqMsgProcessor = null;
        for (Entry<String, MQMsgProcessor> entry : mqMsgProcessorServiceMap.entrySet()) {
            //获取service实现类上注解的topic和tags
            MQConsumeService consumeService = entry.getValue().getClass().getAnnotation(MQConsumeService.class);
            if (consumeService != null && consumeService.topic().getCode().equals(topic)) {
                String[] tagsArr = consumeService.tags();
                //"*"号表示订阅该主题下所有的tag
                if (("*").equals(tagsArr[0]) || Arrays.asList(tagsArr).contains(tag)) {
                    //获取该实例
                    imqMsgProcessor = entry.getValue();
                    break;
                }
            }
        }
        return imqMsgProcessor;
    }

}